// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "olap/rowset/beta_rowset_writer.h"

#include <assert.h>
// IWYU pragma: no_include <bthread/errno.h>
#include <errno.h> // IWYU pragma: keep
#include <fmt/format.h>
#include <stdio.h>

#include <ctime> // time
#include <filesystem>
#include <memory>
#include <mutex>
#include <sstream>
#include <utility>

// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/segcompaction.h"
#include "olap/rowset/segment_v2/inverted_index_cache.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/rowset/segment_v2/segment_writer.h"
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "olap/tablet_schema.h"
#include "runtime/thread_context.h"
#include "util/debug_points.h"
#include "util/pretty_printer.h"
#include "util/slice.h"
#include "util/stopwatch.hpp"
#include "util/time.h"
#include "vec/columns/column.h"
#include "vec/common/schema_util.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
#include "common/compile_check_begin.h"
using namespace ErrorCode;

namespace {

bool is_segment_overlapping(const std::vector<KeyBoundsPB>& segments_encoded_key_bounds) {
    std::string_view last;
    for (auto&& segment_encode_key : segments_encoded_key_bounds) {
        auto&& cur_min = segment_encode_key.min_key();
        auto&& cur_max = segment_encode_key.max_key();
        if (cur_min <= last) {
            return true;
        }
        last = cur_max;
    }
    return false;
}

void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta,
                                       const RowsetMeta& spec_rowset_meta) {
    rowset_meta.set_num_rows(spec_rowset_meta.num_rows());
    rowset_meta.set_total_disk_size(spec_rowset_meta.total_disk_size());
    rowset_meta.set_data_disk_size(spec_rowset_meta.data_disk_size());
    rowset_meta.set_index_disk_size(spec_rowset_meta.index_disk_size());
    // TODO write zonemap to meta
    rowset_meta.set_empty(spec_rowset_meta.num_rows() == 0);
    rowset_meta.set_creation_time(time(nullptr));
    rowset_meta.set_num_segments(spec_rowset_meta.num_segments());
    rowset_meta.set_segments_overlap(spec_rowset_meta.segments_overlap());
    rowset_meta.set_rowset_state(spec_rowset_meta.rowset_state());
    rowset_meta.set_segments_key_bounds_truncated(
            spec_rowset_meta.is_segments_key_bounds_truncated());
    std::vector<KeyBoundsPB> segments_key_bounds;
    spec_rowset_meta.get_segments_key_bounds(&segments_key_bounds);
    rowset_meta.set_segments_key_bounds(segments_key_bounds);
}

} // namespace

SegmentFileCollection::~SegmentFileCollection() = default;

Status SegmentFileCollection::add(int seg_id, io::FileWriterPtr&& writer) {
    std::lock_guard lock(_lock);
    if (_closed) [[unlikely]] {
        DCHECK(false) << writer->path();
        return Status::InternalError("add to closed SegmentFileCollection");
    }

    _file_writers.emplace(seg_id, std::move(writer));
    return Status::OK();
}

io::FileWriter* SegmentFileCollection::get(int seg_id) const {
    std::lock_guard lock(_lock);
    if (auto it = _file_writers.find(seg_id); it != _file_writers.end()) {
        return it->second.get();
    } else {
        return nullptr;
    }
}

Status SegmentFileCollection::close() {
    {
        std::lock_guard lock(_lock);
        if (_closed) [[unlikely]] {
            DCHECK(false);
            return Status::InternalError("double close SegmentFileCollection");
        }
        _closed = true;
    }

    for (auto&& [_, writer] : _file_writers) {
        if (writer->state() != io::FileWriter::State::CLOSED) {
            RETURN_IF_ERROR(writer->close());
        }
    }

    return Status::OK();
}

Result<std::vector<size_t>> SegmentFileCollection::segments_file_size(int seg_id_offset) {
    std::lock_guard lock(_lock);
    if (!_closed) [[unlikely]] {
        DCHECK(false);
        return ResultError(Status::InternalError("get segments file size without closed"));
    }

    Status st;
    std::vector<size_t> seg_file_size(_file_writers.size(), 0);
    bool succ = std::all_of(_file_writers.begin(), _file_writers.end(), [&](auto&& it) {
        auto&& [seg_id, writer] = it;

        int idx = seg_id - seg_id_offset;
        if (idx >= seg_file_size.size()) [[unlikely]] {
            auto err_msg = fmt::format(
                    "invalid seg_id={} num_file_writers={} seg_id_offset={} path={}", seg_id,
                    seg_file_size.size(), seg_id_offset, writer->path().native());
            DCHECK(false) << err_msg;
            st = Status::InternalError(err_msg);
            return false;
        }

        auto& fsize = seg_file_size[idx];
        if (fsize != 0) {
            // File size should not been set
            auto err_msg =
                    fmt::format("duplicate seg_id={} path={}", seg_id, writer->path().native());
            DCHECK(false) << err_msg;
            st = Status::InternalError(err_msg);
            return false;
        }

        fsize = writer->bytes_appended();
        if (fsize <= 0) {
            auto err_msg =
                    fmt::format("invalid segment fsize={} path={}", fsize, writer->path().native());
            DCHECK(false) << err_msg;
            st = Status::InternalError(err_msg);
            return false;
        }

        return true;
    });

    if (succ) {
        return seg_file_size;
    }

    return ResultError(st);
}

InvertedIndexFileCollection::~InvertedIndexFileCollection() = default;

Status InvertedIndexFileCollection::add(int seg_id, IndexFileWriterPtr&& index_writer) {
    std::lock_guard lock(_lock);
    if (_inverted_index_file_writers.find(seg_id) != _inverted_index_file_writers.end())
            [[unlikely]] {
        DCHECK(false);
        return Status::InternalError("The seg_id already exists, seg_id is {}", seg_id);
    }
    _inverted_index_file_writers.emplace(seg_id, std::move(index_writer));
    return Status::OK();
}

Status InvertedIndexFileCollection::close() {
    std::lock_guard lock(_lock);
    for (auto&& [id, writer] : _inverted_index_file_writers) {
        RETURN_IF_ERROR(writer->close());
        _total_size += writer->get_index_file_total_size();
    }

    return Status::OK();
}

Result<std::vector<const InvertedIndexFileInfo*>>
InvertedIndexFileCollection::inverted_index_file_info(int seg_id_offset) {
    std::lock_guard lock(_lock);

    Status st;
    std::vector<const InvertedIndexFileInfo*> idx_file_info(_inverted_index_file_writers.size());
    bool succ = std::all_of(
            _inverted_index_file_writers.begin(), _inverted_index_file_writers.end(),
            [&](auto&& it) {
                auto&& [seg_id, writer] = it;

                int idx = seg_id - seg_id_offset;
                if (idx >= idx_file_info.size()) [[unlikely]] {
                    auto err_msg =
                            fmt::format("invalid seg_id={} num_file_writers={} seg_id_offset={}",
                                        seg_id, idx_file_info.size(), seg_id_offset);
                    DCHECK(false) << err_msg;
                    st = Status::InternalError(err_msg);
                    return false;
                }
                idx_file_info[idx] = _inverted_index_file_writers[seg_id]->get_index_file_info();
                return true;
            });

    if (succ) {
        return idx_file_info;
    }

    return ResultError(st);
}

BaseBetaRowsetWriter::BaseBetaRowsetWriter()
        : _num_segment(0),
          _segment_start_id(0),
          _num_rows_written(0),
          _total_data_size(0),
          _total_index_size(0),
          _segment_creator(_context, _seg_files, _idx_files) {}

BetaRowsetWriter::BetaRowsetWriter(StorageEngine& engine)
        : _engine(engine), _segcompaction_worker(std::make_shared<SegcompactionWorker>(this)) {}

BaseBetaRowsetWriter::~BaseBetaRowsetWriter() {
    if (!_already_built && _rowset_meta->is_local()) {
        // abnormal exit, remove all files generated
        auto& fs = io::global_local_filesystem();
        for (int i = _segment_start_id; i < _segment_creator.next_segment_id(); ++i) {
            std::string seg_path =
                    local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), i);
            // Even if an error is encountered, these files that have not been cleaned up
            // will be cleaned up by the GC background. So here we only print the error
            // message when we encounter an error.
            WARN_IF_ERROR(fs->delete_file(seg_path),
                          fmt::format("Failed to delete file={}", seg_path));
        }
    }
}

BetaRowsetWriter::~BetaRowsetWriter() {
    /* Note that segcompaction is async and in parallel with load job. So we should handle carefully
     * when the job is cancelled. Although it is meaningless to continue segcompaction when the job
     * is cancelled, the objects involved in the job should be preserved during segcompaction to
     * avoid crashs for memory issues. */
    WARN_IF_ERROR(_wait_flying_segcompaction(), "segment compaction failed");

    if (_calc_delete_bitmap_token != nullptr) {
        _calc_delete_bitmap_token->cancel();
    }
}

Status BaseBetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) {
    _context = rowset_writer_context;
    _rowset_meta.reset(new RowsetMeta);
    if (_context.storage_resource) {
        _rowset_meta->set_remote_storage_resource(*_context.storage_resource);
    }
    _rowset_meta->set_rowset_id(_context.rowset_id);
    _rowset_meta->set_partition_id(_context.partition_id);
    _rowset_meta->set_tablet_id(_context.tablet_id);
    _rowset_meta->set_tablet_schema_hash(_context.tablet_schema_hash);
    _rowset_meta->set_rowset_type(_context.rowset_type);
    _rowset_meta->set_rowset_state(_context.rowset_state);
    _rowset_meta->set_segments_overlap(_context.segments_overlap);
    if (_context.rowset_state == PREPARED || _context.rowset_state == COMMITTED) {
        _is_pending = true;
        _rowset_meta->set_txn_id(_context.txn_id);
        _rowset_meta->set_load_id(_context.load_id);
    } else {
        _rowset_meta->set_version(_context.version);
        _rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp);
    }
    _rowset_meta->set_tablet_uid(_context.tablet_uid);
    _rowset_meta->set_tablet_schema(_context.tablet_schema);
    _context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this);
    _context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this);
    return Status::OK();
}

Status BaseBetaRowsetWriter::add_block(const vectorized::Block* block) {
    return _segment_creator.add_block(block);
}

Status BaseBetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
    SCOPED_RAW_TIMER(&_delete_bitmap_ns);
    if (!_context.tablet->enable_unique_key_merge_on_write() ||
        (_context.partial_update_info && _context.partial_update_info->is_partial_update())) {
        return Status::OK();
    }
    RowsetSharedPtr rowset_ptr;
    RETURN_IF_ERROR(_build_tmp(rowset_ptr));
    auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset_ptr.get());
    std::vector<segment_v2::SegmentSharedPtr> segments;
    RETURN_IF_ERROR(beta_rowset->load_segments(segment_id, segment_id + 1, &segments));
    std::vector<RowsetSharedPtr> specified_rowsets;
    {
        std::shared_lock meta_rlock(_context.tablet->get_header_lock());
        specified_rowsets =
                _context.tablet->get_rowset_by_ids(_context.mow_context->rowset_ids.get());
    }
    OlapStopWatch watch;
    auto finish_callback = [this](segment_v2::SegmentSharedPtr segment, Status st) {
        auto* task = calc_delete_bitmap_task(segment->id());
        task->set_status(st);
    };
    RETURN_IF_ERROR(BaseTablet::calc_delete_bitmap(
            _context.tablet, rowset_ptr, segments, specified_rowsets,
            _context.mow_context->delete_bitmap, _context.mow_context->max_version,
            _calc_delete_bitmap_token.get(), nullptr, nullptr, std::move(finish_callback)));
    size_t total_rows = std::accumulate(
            segments.begin(), segments.end(), 0,
            [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); });
    LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: " << _context.tablet->tablet_id()
              << ", rowset_ids: " << _context.mow_context->rowset_ids->size()
              << ", cur max_version: " << _context.mow_context->max_version
              << ", transaction_id: " << _context.mow_context->txn_id << ", delete_bitmap_count: "
              << _context.mow_context->delete_bitmap->get_delete_bitmap_count()
              << ", delete_bitmap_cardinality: "
              << _context.mow_context->delete_bitmap->cardinality()
              << ", cost: " << watch.get_elapse_time_us() << "(us), total rows: " << total_rows;
    return Status::OK();
}

Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) {
    RETURN_IF_ERROR(BaseBetaRowsetWriter::init(rowset_writer_context));
    if (_segcompaction_worker) {
        _segcompaction_worker->init_mem_tracker(rowset_writer_context);
    }
    if (_context.mow_context != nullptr) {
        _calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor_for_load()->create_token();
    }
    return Status::OK();
}

Status BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment,
                                                    int32_t segment_id) {
    DCHECK(_rowset_meta->is_local());
    auto fs = _rowset_meta->fs();
    if (!fs) {
        return Status::Error<INIT_FAILED>(
                "BetaRowsetWriter::_load_noncompacted_segment _rowset_meta->fs get failed");
    }
    auto path =
            local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), segment_id);
    io::FileReaderOptions reader_options {
            .cache_type =
                    _context.write_file_cache
                            ? (config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE
                                                         : io::FileCachePolicy::NO_CACHE)
                            : io::FileCachePolicy::NO_CACHE,
            .is_doris_table = true,
            .cache_base_path {},
    };
    auto s = segment_v2::Segment::open(fs, path, _rowset_meta->tablet_id(), segment_id, rowset_id(),
                                       _context.tablet_schema, reader_options, &segment);
    if (!s.ok()) {
        LOG(WARNING) << "failed to open segment. " << path << ":" << s;
        return s;
    }
    return Status::OK();
}

/* policy of segcompaction target selection:
 *  1. skip big segments
 *  2. if the consecutive smalls end up with a big, compact the smalls, except
 *     single small
 *  3. if the consecutive smalls end up with small, compact the smalls if the
 *     length is beyond (config::segcompaction_batch_size / 2)
 */
Status BetaRowsetWriter::_find_longest_consecutive_small_segment(
        SegCompactionCandidatesSharedPtr& segments) {
    segments = std::make_shared<SegCompactionCandidates>();
    // skip last (maybe active) segment
    int32_t last_segment = _num_segment - 1;
    size_t task_bytes = 0;
    uint32_t task_rows = 0;
    int32_t segid;
    for (segid = _segcompacted_point;
         segid < last_segment && segments->size() < config::segcompaction_batch_size; segid++) {
        segment_v2::SegmentSharedPtr segment;
        RETURN_IF_ERROR(_load_noncompacted_segment(segment, segid));
        const auto segment_rows = segment->num_rows();
        const auto segment_bytes = segment->file_reader()->size();
        bool is_large_segment = segment_rows > config::segcompaction_candidate_max_rows ||
                                segment_bytes > config::segcompaction_candidate_max_bytes;
        if (is_large_segment) {
            if (segid == _segcompacted_point) {
                // skip large segments at the front
                auto dst_seg_id = _num_segcompacted.load();
                RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
                if (_segcompaction_worker->need_convert_delete_bitmap()) {
                    RETURN_IF_ERROR(_segcompaction_worker->convert_segment_delete_bitmap(
                            segment, _context.mow_context->delete_bitmap, segid, dst_seg_id));
                }
                continue;
            } else {
                // stop because we need consecutive segments
                break;
            }
        }
        bool is_task_full = task_rows + segment_rows > config::segcompaction_task_max_rows ||
                            task_bytes + segment_bytes > config::segcompaction_task_max_bytes;
        if (is_task_full) {
            break;
        }
        segments->push_back(segment);
        task_rows += segment->num_rows();
        task_bytes += segment->file_reader()->size();
    }
    size_t s = segments->size();
    if (segid == last_segment && s <= (config::segcompaction_batch_size / 2)) {
        // we didn't collect enough segments, better to do it in next
        // round to compact more at once
        segments->clear();
        return Status::OK();
    }
    if (s == 1) { // poor bachelor, let it go
        VLOG_DEBUG << "only one candidate segment";
        auto src_seg_id = _segcompacted_point.load();
        auto dst_seg_id = _num_segcompacted.load();
        segment_v2::SegmentSharedPtr segment;
        RETURN_IF_ERROR(_load_noncompacted_segment(segment, src_seg_id));
        RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
        if (_segcompaction_worker->need_convert_delete_bitmap()) {
            RETURN_IF_ERROR(_segcompaction_worker->convert_segment_delete_bitmap(
                    segment, _context.mow_context->delete_bitmap, src_seg_id, dst_seg_id));
        }
        segments->clear();
        return Status::OK();
    }
    if (VLOG_DEBUG_IS_ON) {
        vlog_buffer.clear();
        for (auto& segment : (*segments.get())) {
            fmt::format_to(vlog_buffer, "[id:{} num_rows:{}]", segment->id(), segment->num_rows());
        }
        VLOG_DEBUG << "candidate segments num:" << s
                   << " list of candidates:" << fmt::to_string(vlog_buffer);
    }
    return Status::OK();
}

Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t end) {
    int ret;
    auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
                                                                    _context.rowset_id, begin, end);
    auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(),
                                           _num_segcompacted);
    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
    if (ret) {
        return Status::Error<ROWSET_RENAME_FILE_FAILED>(
                "failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret,
                errno);
    }

    // rename inverted index files
    RETURN_IF_ERROR(_rename_compacted_indices(begin, end, 0));

    _num_segcompacted++;
    return Status::OK();
}

void BetaRowsetWriter::_clear_statistics_for_deleting_segments_unsafe(uint32_t begin,
                                                                      uint32_t end) {
    VLOG_DEBUG << "_segid_statistics_map clear record segid range from:" << begin << " to:" << end;
    for (uint32_t i = begin; i <= end; ++i) {
        _segid_statistics_map.erase(i);
    }
}

Status BetaRowsetWriter::_rename_compacted_segment_plain(uint32_t seg_id) {
    if (seg_id == _num_segcompacted) {
        ++_num_segcompacted;
        return Status::OK();
    }

    auto src_seg_path =
            local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), seg_id);
    auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(),
                                           _num_segcompacted);
    VLOG_DEBUG << "segcompaction skip this segment. rename " << src_seg_path << " to "
               << dst_seg_path;
    {
        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
        DCHECK_EQ(_segid_statistics_map.find(seg_id) == _segid_statistics_map.end(), false);
        DCHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(),
                  true);
        auto org = _segid_statistics_map[seg_id];
        _segid_statistics_map.emplace(_num_segcompacted, org);
        _clear_statistics_for_deleting_segments_unsafe(seg_id, seg_id);
    }
    int ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
    if (ret) {
        return Status::Error<ROWSET_RENAME_FILE_FAILED>(
                "failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret,
                errno);
    }
    // rename remaining inverted index files
    RETURN_IF_ERROR(_rename_compacted_indices(-1, -1, seg_id));

    ++_num_segcompacted;
    return Status::OK();
}

Status BetaRowsetWriter::_rename_compacted_indices(int64_t begin, int64_t end, uint64_t seg_id) {
    int ret;

    auto src_seg_path = begin < 0 ? local_segment_path(_context.tablet_path,
                                                       _context.rowset_id.to_string(), seg_id)
                                  : BetaRowset::local_segment_path_segcompacted(
                                            _context.tablet_path, _context.rowset_id, begin, end);
    auto src_index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(src_seg_path);
    auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(),
                                           _num_segcompacted);
    auto dst_index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(dst_seg_path);

    if (_context.tablet_schema->get_inverted_index_storage_format() >=
        InvertedIndexStorageFormatPB::V2) {
        if (_context.tablet_schema->has_inverted_index() ||
            _context.tablet_schema->has_ann_index()) {
            auto src_idx_path =
                    InvertedIndexDescriptor::get_index_file_path_v2(src_index_path_prefix);
            auto dst_idx_path =
                    InvertedIndexDescriptor::get_index_file_path_v2(dst_index_path_prefix);

            ret = rename(src_idx_path.c_str(), dst_idx_path.c_str());
            if (ret) {
                return Status::Error<ROWSET_RENAME_FILE_FAILED>(
                        "failed to rename {} to {}. ret:{}, errno:{}", src_idx_path, dst_idx_path,
                        ret, errno);
            }
        }
    }
    // rename remaining inverted index files
    for (auto column : _context.tablet_schema->columns()) {
        auto index_infos = _context.tablet_schema->inverted_indexs(*column);
        for (const auto& index_info : index_infos) {
            auto index_id = index_info->index_id();
            if (_context.tablet_schema->get_inverted_index_storage_format() ==
                InvertedIndexStorageFormatPB::V1) {
                auto src_idx_path = InvertedIndexDescriptor::get_index_file_path_v1(
                        src_index_path_prefix, index_id, index_info->get_index_suffix());
                auto dst_idx_path = InvertedIndexDescriptor::get_index_file_path_v1(
                        dst_index_path_prefix, index_id, index_info->get_index_suffix());
                VLOG_DEBUG << "segcompaction skip this index. rename " << src_idx_path << " to "
                           << dst_idx_path;
                ret = rename(src_idx_path.c_str(), dst_idx_path.c_str());
                if (ret) {
                    return Status::Error<INVERTED_INDEX_RENAME_FILE_FAILED>(
                            "failed to rename {} to {}. ret:{}, errno:{}", src_idx_path,
                            dst_idx_path, ret, errno);
                }
            }
            // Erase the origin index file cache
            auto src_idx_cache_key = InvertedIndexDescriptor::get_index_file_cache_key(
                    src_index_path_prefix, index_id, index_info->get_index_suffix());
            auto dst_idx_cache_key = InvertedIndexDescriptor::get_index_file_cache_key(
                    dst_index_path_prefix, index_id, index_info->get_index_suffix());
            RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(src_idx_cache_key));
            RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(dst_idx_cache_key));
        }
    }
    return Status::OK();
}

Status BetaRowsetWriter::_segcompaction_if_necessary() {
    Status status = Status::OK();
    // if not doing segcompaction, just check segment number
    if (!config::enable_segcompaction || !_context.enable_segcompaction ||
        _context.tablet_schema->num_variant_columns() > 0) {
        return _check_segment_number_limit(_num_segment);
    }
    // leave _is_doing_segcompaction as the last condition
    // otherwise _segcompacting_cond will never get notified
    if (_is_doing_segcompaction.exchange(true)) {
        return status;
    }
    if (_segcompaction_status.load() != OK) {
        status = Status::Error<SEGCOMPACTION_FAILED>(
                "BetaRowsetWriter::_segcompaction_if_necessary meet invalid state, error code: {}",
                _segcompaction_status.load());
    } else {
        status = _check_segment_number_limit(_num_segcompacted);
    }
    if (status.ok() && (_num_segment - _segcompacted_point) >= config::segcompaction_batch_size) {
        SegCompactionCandidatesSharedPtr segments;
        status = _find_longest_consecutive_small_segment(segments);
        if (LIKELY(status.ok()) && (!segments->empty())) {
            LOG(INFO) << "submit segcompaction task, tablet_id:" << _context.tablet_id
                      << " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment
                      << ", segcompacted_point:" << _segcompacted_point;
            status = _engine.submit_seg_compaction_task(_segcompaction_worker, segments);
            if (status.ok()) {
                return status;
            }
        }
    }
    {
        std::lock_guard lk(_is_doing_segcompaction_lock);
        _is_doing_segcompaction = false;
        _segcompacting_cond.notify_all();
    }
    return status;
}

Status BetaRowsetWriter::_segcompaction_rename_last_segments() {
    DCHECK_EQ(_is_doing_segcompaction, false);
    if (!config::enable_segcompaction) {
        return Status::OK();
    }
    if (_segcompaction_status.load() != OK) {
        return Status::Error<SEGCOMPACTION_FAILED>(
                "BetaRowsetWriter::_segcompaction_rename_last_segments meet invalid state, error "
                "code: {}",
                _segcompaction_status.load());
    }
    if (!is_segcompacted() || _segcompacted_point == _num_segment) {
        // no need if never segcompact before or all segcompacted
        return Status::OK();
    }
    // currently we only rename remaining segments to reduce wait time
    // so that transaction can be committed ASAP
    VLOG_DEBUG << "segcompaction last few segments";
    for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) {
        auto dst_segid = _num_segcompacted.load();
        segment_v2::SegmentSharedPtr segment;
        RETURN_IF_ERROR(_load_noncompacted_segment(segment, segid));
        RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
        if (_segcompaction_worker->need_convert_delete_bitmap()) {
            RETURN_IF_ERROR(_segcompaction_worker->convert_segment_delete_bitmap(
                    segment, _context.mow_context->delete_bitmap, segid, dst_segid));
        }
    }
    return Status::OK();
}

Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
    assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET);
    RETURN_IF_ERROR(rowset->link_files_to(_context.tablet_path, _context.rowset_id));
    _num_rows_written += rowset->num_rows();
    const auto& rowset_meta = rowset->rowset_meta();
    auto index_size = rowset_meta->index_disk_size();
    auto total_size = rowset_meta->total_disk_size();
    auto data_size = rowset_meta->data_disk_size();
    // corrupted index size caused by bug before 2.1.5 or 3.0.0 version
    // try to get real index size from disk.
    if (index_size < 0 || index_size > total_size * 2) {
        LOG(ERROR) << "invalid index size:" << index_size << " total size:" << total_size
                   << " data size:" << data_size << " tablet:" << rowset_meta->tablet_id()
                   << " rowset:" << rowset_meta->rowset_id();
        index_size = 0;
        auto st = rowset->get_inverted_index_size(&index_size);
        if (!st.ok()) {
            if (!st.is<NOT_FOUND>()) {
                LOG(ERROR) << "failed to get inverted index size. res=" << st;
                return st;
            }
        }
    }
    _total_data_size += data_size;
    _total_index_size += index_size;
    _num_segment += cast_set<int32_t>(rowset->num_segments());
    // append key_bounds to current rowset
    RETURN_IF_ERROR(rowset->get_segments_key_bounds(&_segments_encoded_key_bounds));
    _segments_key_bounds_truncated = rowset->rowset_meta()->is_segments_key_bounds_truncated();

    // TODO update zonemap
    if (rowset->rowset_meta()->has_delete_predicate()) {
        _rowset_meta->set_delete_predicate(rowset->rowset_meta()->delete_predicate());
    }
    // Update the tablet schema in the rowset metadata if the tablet schema contains a variant.
    // During the build process, _context.tablet_schema will be used as the rowset schema.
    // This situation may arise in the event of a linked schema change. If this schema is not set,
    // the subcolumns of the variant will be lost.
    if (_context.tablet_schema->num_variant_columns() > 0 && rowset->tablet_schema() != nullptr) {
        _context.tablet_schema = rowset->tablet_schema();
    }
    return Status::OK();
}

Status BaseBetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) {
    // TODO use schema_mapping to transfer zonemap
    return add_rowset(rowset);
}

Status BaseBetaRowsetWriter::flush() {
    return _segment_creator.flush();
}

Status BaseBetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t segment_id,
                                            int64_t* flush_size) {
    if (block->rows() == 0) {
        return Status::OK();
    }

    {
        SCOPED_RAW_TIMER(&_segment_writer_ns);
        RETURN_IF_ERROR(_segment_creator.flush_single_block(block, segment_id, flush_size));
    }
    return Status::OK();
}

Status BaseBetaRowsetWriter::flush_single_block(const vectorized::Block* block) {
    return _segment_creator.flush_single_block(block);
}

Status BetaRowsetWriter::_wait_flying_segcompaction() {
    std::unique_lock<std::mutex> l(_is_doing_segcompaction_lock);
    uint64_t begin_wait = GetCurrentTimeMicros();
    while (_is_doing_segcompaction) {
        // change sync wait to async?
        _segcompacting_cond.wait(l);
    }
    uint64_t elapsed = GetCurrentTimeMicros() - begin_wait;
    if (elapsed >= MICROS_PER_SEC) {
        LOG(INFO) << "wait flying segcompaction finish time:" << elapsed << "us";
    }
    if (_segcompaction_status.load() != OK) {
        return Status::Error<SEGCOMPACTION_FAILED>(
                "BetaRowsetWriter meet invalid state, error code: {}",
                _segcompaction_status.load());
    }
    return Status::OK();
}

RowsetSharedPtr BaseBetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& spec_rowset_meta) {
    if (_rowset_meta->newest_write_timestamp() == -1) {
        _rowset_meta->set_newest_write_timestamp(UnixSeconds());
    }

    build_rowset_meta_with_spec_field(*_rowset_meta, *spec_rowset_meta);
    RowsetSharedPtr rowset;
    auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path,
                                               _rowset_meta, &rowset);
    if (!status.ok()) {
        LOG(WARNING) << "rowset init failed when build new rowset, res=" << status;
        return nullptr;
    }
    _already_built = true;
    return rowset;
}

Status BaseBetaRowsetWriter::_close_file_writers() {
    // Flush and close segment files
    RETURN_NOT_OK_STATUS_WITH_WARN(_segment_creator.close(),
                                   "failed to close segment creator when build new rowset");
    return Status::OK();
}

Status BetaRowsetWriter::_close_file_writers() {
    RETURN_IF_ERROR(BaseBetaRowsetWriter::_close_file_writers());
    // if _segment_start_id is not zero, that means it's a transient rowset writer for
    // MoW partial update, don't need to do segment compaction.
    if (_segment_start_id == 0) {
        if (_segcompaction_worker->cancel()) {
            std::lock_guard lk(_is_doing_segcompaction_lock);
            _is_doing_segcompaction = false;
            _segcompacting_cond.notify_all();
        } else {
            RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(),
                                           "segcompaction failed when build new rowset");
        }
        RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_rename_last_segments(),
                                       "rename last segments failed when build new rowset");
        // segcompaction worker would do file wrier's close function in compact_segments
        if (auto& seg_comp_file_writer = _segcompaction_worker->get_file_writer();
            nullptr != seg_comp_file_writer &&
            seg_comp_file_writer->state() != io::FileWriter::State::CLOSED) {
            RETURN_NOT_OK_STATUS_WITH_WARN(seg_comp_file_writer->close(),
                                           "close segment compaction worker failed");
        }
        // process delete bitmap for mow table
        if (is_segcompacted() && _segcompaction_worker->need_convert_delete_bitmap()) {
            auto converted_delete_bitmap = _segcompaction_worker->get_converted_delete_bitmap();
            // which means the segment compaction is triggerd
            if (converted_delete_bitmap != nullptr) {
                RowsetIdUnorderedSet rowsetids;
                rowsetids.insert(rowset_id());
                context().tablet->add_sentinel_mark_to_delete_bitmap(converted_delete_bitmap.get(),
                                                                     rowsetids);
                context().mow_context->delete_bitmap->remove({rowset_id(), 0, 0},
                                                             {rowset_id(), UINT32_MAX, INT64_MAX});
                context().mow_context->delete_bitmap->merge(*converted_delete_bitmap);
            }
        }
    }
    return Status::OK();
}

Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
    if (_calc_delete_bitmap_token != nullptr) {
        RETURN_IF_ERROR(_calc_delete_bitmap_token->wait());
    }
    RETURN_IF_ERROR(_close_file_writers());
    const auto total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted;
    RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(total_segment_num),
                                   "too many segments when build new rowset");
    RETURN_IF_ERROR(_build_rowset_meta(_rowset_meta.get(), true));
    if (_is_pending) {
        _rowset_meta->set_rowset_state(COMMITTED);
    } else {
        _rowset_meta->set_rowset_state(VISIBLE);
    }

    if (_rowset_meta->newest_write_timestamp() == -1) {
        _rowset_meta->set_newest_write_timestamp(UnixSeconds());
    }

    _rowset_meta->set_tablet_schema(_context.tablet_schema);

    // If segment compaction occurs, the idx file info will become inaccurate.
    if ((_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) &&
        _num_segcompacted == 0) {
        if (auto idx_files_info = _idx_files.inverted_index_file_info(_segment_start_id);
            !idx_files_info.has_value()) [[unlikely]] {
            LOG(ERROR) << "expected inverted index files info, but none presents: "
                       << idx_files_info.error();
        } else {
            _rowset_meta->add_inverted_index_files_info(idx_files_info.value());
        }
    }

    RETURN_NOT_OK_STATUS_WITH_WARN(
            RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, _rowset_meta,
                                         &rowset),
            "rowset init failed when build new rowset");
    _already_built = true;
    return Status::OK();
}

int64_t BaseBetaRowsetWriter::_num_seg() const {
    return _num_segment;
}

int64_t BetaRowsetWriter::_num_seg() const {
    return is_segcompacted() ? _num_segcompacted : _num_segment;
}

Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool check_segment_num) {
    int64_t num_rows_written = 0;
    int64_t total_data_size = 0;
    int64_t total_index_size = 0;
    std::vector<KeyBoundsPB> segments_encoded_key_bounds;
    {
        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
        for (const auto& itr : _segid_statistics_map) {
            num_rows_written += itr.second.row_num;
            total_data_size += itr.second.data_size;
            total_index_size += itr.second.index_size;
            segments_encoded_key_bounds.push_back(itr.second.key_bounds);
        }
    }
    for (auto& key_bound : _segments_encoded_key_bounds) {
        segments_encoded_key_bounds.push_back(key_bound);
    }
    if (_segments_key_bounds_truncated.has_value()) {
        rowset_meta->set_segments_key_bounds_truncated(_segments_key_bounds_truncated.value());
    }
    // segment key bounds are empty in old version(before version 1.2.x). So we should not modify
    // the overlap property when key bounds are empty.
    // for mow table with cluster keys, the overlap is used for cluster keys,
    // the key_bounds is primary keys
    if (!segments_encoded_key_bounds.empty() &&
        !is_segment_overlapping(segments_encoded_key_bounds) &&
        _context.tablet_schema->cluster_key_uids().empty()) {
        rowset_meta->set_segments_overlap(NONOVERLAPPING);
    }

    auto segment_num = _num_seg();
    if (check_segment_num && config::check_segment_when_build_rowset_meta) {
        auto segments_encoded_key_bounds_size = segments_encoded_key_bounds.size();
        if (segments_encoded_key_bounds_size != segment_num) {
            return Status::InternalError(
                    "segments_encoded_key_bounds_size should  equal to _num_seg, "
                    "segments_encoded_key_bounds_size "
                    "is: {}, _num_seg is: {}",
                    segments_encoded_key_bounds_size, segment_num);
        }
    }

    rowset_meta->set_num_segments(segment_num);
    rowset_meta->set_num_rows(num_rows_written + _num_rows_written);
    rowset_meta->set_total_disk_size(total_data_size + _total_data_size + total_index_size +
                                     _total_index_size);
    rowset_meta->set_data_disk_size(total_data_size + _total_data_size);
    rowset_meta->set_index_disk_size(total_index_size + _total_index_size);
    rowset_meta->set_segments_key_bounds(segments_encoded_key_bounds);
    // TODO write zonemap to meta
    rowset_meta->set_empty((num_rows_written + _num_rows_written) == 0);
    rowset_meta->set_creation_time(time(nullptr));
    return Status::OK();
}

Status BaseBetaRowsetWriter::_build_tmp(RowsetSharedPtr& rowset_ptr) {
    Status status;
    std::shared_ptr<RowsetMeta> tmp_rs_meta = std::make_shared<RowsetMeta>();
    tmp_rs_meta->init(_rowset_meta.get());

    status = _build_rowset_meta(tmp_rs_meta.get());
    if (!status.ok()) {
        LOG(WARNING) << "failed to build rowset meta, res=" << status;
        return status;
    }

    status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, tmp_rs_meta,
                                          &rowset_ptr);
    DBUG_EXECUTE_IF("BaseBetaRowsetWriter::_build_tmp.create_rowset_failed",
                    { status = Status::InternalError("create rowset failed"); });
    if (!status.ok()) {
        LOG(WARNING) << "rowset init failed when build new rowset, res=" << status;
        return status;
    }
    return Status::OK();
}

Status BaseBetaRowsetWriter::_create_file_writer(const std::string& path,
                                                 io::FileWriterPtr& file_writer) {
    io::FileWriterOptions opts = _context.get_file_writer_options();
    Status st = _context.fs()->create_file(path, &file_writer, &opts);
    if (!st.ok()) {
        LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st;
        return st;
    }

    DCHECK(file_writer != nullptr);
    return Status::OK();
}

Status BaseBetaRowsetWriter::create_file_writer(uint32_t segment_id, io::FileWriterPtr& file_writer,
                                                FileType file_type) {
    auto segment_path = _context.segment_path(segment_id);
    if (file_type == FileType::INVERTED_INDEX_FILE) {
        std::string prefix =
                std::string {InvertedIndexDescriptor::get_index_file_path_prefix(segment_path)};
        std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix);
        return _create_file_writer(index_path, file_writer);
    } else if (file_type == FileType::SEGMENT_FILE) {
        return _create_file_writer(segment_path, file_writer);
    }
    return Status::Error<ErrorCode::INTERNAL_ERROR>(
            fmt::format("failed to create file = {}, file type = {}", segment_path, file_type));
}

Status BaseBetaRowsetWriter::create_index_file_writer(uint32_t segment_id,
                                                      IndexFileWriterPtr* index_file_writer) {
    RETURN_IF_ERROR(RowsetWriter::create_index_file_writer(segment_id, index_file_writer));
    // used for inverted index format v1
    (*index_file_writer)->set_file_writer_opts(_context.get_file_writer_options());
    return Status::OK();
}

Status BetaRowsetWriter::create_segment_writer_for_segcompaction(
        std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, int64_t end) {
    DCHECK(begin >= 0 && end >= 0);
    std::string path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
                                                                   _context.rowset_id, begin, end);
    io::FileWriterPtr file_writer;
    RETURN_IF_ERROR(_create_file_writer(path, file_writer));

    IndexFileWriterPtr index_file_writer;
    if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
        io::FileWriterPtr idx_file_writer;
        std::string prefix(InvertedIndexDescriptor::get_index_file_path_prefix(path));
        if (_context.tablet_schema->get_inverted_index_storage_format() !=
            InvertedIndexStorageFormatPB::V1) {
            std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix);
            RETURN_IF_ERROR(_create_file_writer(index_path, idx_file_writer));
        }
        index_file_writer = std::make_unique<IndexFileWriter>(
                _context.fs(), prefix, _context.rowset_id.to_string(), _num_segcompacted,
                _context.tablet_schema->get_inverted_index_storage_format(),
                std::move(idx_file_writer));
    }

    segment_v2::SegmentWriterOptions writer_options;
    writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write;
    writer_options.rowset_ctx = &_context;
    writer_options.write_type = _context.write_type;
    writer_options.write_type = DataWriteType::TYPE_COMPACTION;
    writer_options.max_rows_per_segment = _context.max_rows_per_segment;
    writer_options.mow_ctx = _context.mow_context;

    *writer = std::make_unique<segment_v2::SegmentWriter>(
            file_writer.get(), _num_segcompacted, _context.tablet_schema, _context.tablet,
            _context.data_dir, writer_options, index_file_writer.get());
    if (auto& seg_writer = _segcompaction_worker->get_file_writer();
        seg_writer != nullptr && seg_writer->state() != io::FileWriter::State::CLOSED) {
        RETURN_IF_ERROR(_segcompaction_worker->get_file_writer()->close());
    }
    _segcompaction_worker->get_file_writer().reset(file_writer.release());
    if (auto& idx_file_writer = _segcompaction_worker->get_inverted_index_file_writer();
        idx_file_writer != nullptr) {
        RETURN_IF_ERROR(idx_file_writer->close());
    }
    _segcompaction_worker->get_inverted_index_file_writer().reset(index_file_writer.release());
    return Status::OK();
}

Status BaseBetaRowsetWriter::_check_segment_number_limit(size_t segnum) {
    DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments",
                    { segnum = dp->param("segnum", 1024); });
    if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) {
        return Status::Error<TOO_MANY_SEGMENTS>(
                "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, "
                "_num_segment:{}, rowset_num_rows:{}. Please check if the bucket number is too "
                "small or if the data is skewed.",
                _context.tablet_id, _context.rowset_id.to_string(),
                config::max_segment_num_per_rowset, _num_segment, get_rowset_num_rows());
    }
    return Status::OK();
}

Status BetaRowsetWriter::_check_segment_number_limit(size_t segnum) {
    DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments",
                    { segnum = dp->param("segnum", 1024); });
    if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) {
        return Status::Error<TOO_MANY_SEGMENTS>(
                "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, _num_segment:{}, "
                "_segcompacted_point:{}, _num_segcompacted:{}, rowset_num_rows:{}. Please check if "
                "the bucket number is too small or if the data is skewed.",
                _context.tablet_id, _context.rowset_id.to_string(),
                config::max_segment_num_per_rowset, _num_segment, _segcompacted_point,
                _num_segcompacted, get_rowset_num_rows());
    }
    return Status::OK();
}

Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) {
    uint32_t segid_offset = segment_id - _segment_start_id;
    {
        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
        CHECK_EQ(_segid_statistics_map.find(segment_id) == _segid_statistics_map.end(), true);
        _segid_statistics_map.emplace(segment_id, segstat);
        if (segment_id >= _segment_num_rows.size()) {
            _segment_num_rows.resize(segment_id + 1);
        }
        _segment_num_rows[segid_offset] = cast_set<uint32_t>(segstat.row_num);
    }
    VLOG_DEBUG << "_segid_statistics_map add new record. segment_id:" << segment_id
               << " row_num:" << segstat.row_num << " data_size:" << segstat.data_size
               << " index_size:" << segstat.index_size;

    {
        std::lock_guard<std::mutex> lock(_segment_set_mutex);
        _segment_set.add(segid_offset);
        while (_segment_set.contains(_num_segment)) {
            _num_segment++;
        }
    }

    if (_context.mow_context != nullptr) {
        // ensure that the segment file writing is complete
        auto* file_writer = _seg_files.get(segment_id);
        if (file_writer && file_writer->state() != io::FileWriter::State::CLOSED) {
            MonotonicStopWatch close_timer;
            close_timer.start();
            RETURN_IF_ERROR(file_writer->close());
            close_timer.stop();

            auto close_time_ms = close_timer.elapsed_time_milliseconds();
            if (close_time_ms > 1000) {
                LOG(INFO) << "file_writer->close() took " << close_time_ms
                          << "ms for segment_id=" << segment_id
                          << ", tablet_id=" << _context.tablet_id
                          << ", rowset_id=" << _context.rowset_id;
            }
        }
        RETURN_IF_ERROR(_generate_delete_bitmap(segment_id));
    }
    return Status::OK();
}

Status BetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) {
    RETURN_IF_ERROR(BaseBetaRowsetWriter::add_segment(segment_id, segstat));
    return _segcompaction_if_necessary();
}

Status BetaRowsetWriter::flush_segment_writer_for_segcompaction(
        std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t index_size,
        KeyBoundsPB& key_bounds) {
    uint32_t segid = (*writer)->get_segment_id();
    uint32_t row_num = (*writer)->row_count();
    uint64_t segment_size;

    auto s = (*writer)->finalize_footer(&segment_size);
    if (!s.ok()) {
        return Status::Error<WRITER_DATA_WRITE_ERROR>("failed to finalize segment: {}",
                                                      s.to_string());
    }
    int64_t inverted_index_file_size = 0;
    RETURN_IF_ERROR((*writer)->close_inverted_index(&inverted_index_file_size));

    SegmentStatistics segstat;
    segstat.row_num = row_num;
    segstat.data_size = segment_size;
    segstat.index_size = inverted_index_file_size;
    segstat.key_bounds = key_bounds;
    {
        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
        CHECK_EQ(_segid_statistics_map.find(segid) == _segid_statistics_map.end(), true);
        _segid_statistics_map.emplace(segid, segstat);
    }
    VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid << " row_num:" << row_num
               << " data_size:" << PrettyPrinter::print_bytes(segment_size)
               << " index_size:" << PrettyPrinter::print_bytes(inverted_index_file_size);

    writer->reset();

    return Status::OK();
}

#include "common/compile_check_end.h"
} // namespace doris
